package cn.ryh.kafka.code.consummer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

/**
 * 指定 Offset 消费
 *
 * @author Raoyh
 * @Description
 * @create 2022-07-26 10:14
 */
public class CustomConsumerSeek {
    public static void main(String[] args) {
        // 1.创建消费者的配置对象
        Properties properties = new Properties();
        // 2.给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "119.3.50.65:8092");
        // 配置序列化 必须
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组（组名任意起名） 必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
        // 创建消费者对象
        try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties)) {
            // 注册要消费的主题（可以消费多个主题）
            ArrayList<String> topics = new ArrayList<>();
            topics.add("first");
            kafkaConsumer.subscribe(topics);
            // 指定 Offset 消费
            assignOffset(kafkaConsumer);
            // 指定时间消费
            assignTime(kafkaConsumer);
            // 拉取数据打印
            while (true) {
                // 设置 1s 中消费一批数据
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
                // 打印消费到的数据
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }

    }

    /**
     * 指定时间消费
     *
     * @param kafkaConsumer
     */
    private static void assignTime(KafkaConsumer<String, String> kafkaConsumer) {
        Set<TopicPartition> assignment = new HashSet<>();
        while (assignment.size() == 0) {
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 获取消费者分区分配信息（有了分区分配信息才能开始消费）
            assignment = kafkaConsumer.assignment();
        }
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>(16);
        // 封装集合存储，每个分区对应一天前的数据
        for (TopicPartition topicPartition : assignment) {
            timestampToSearch.put(topicPartition, System.currentTimeMillis() - 24 * 3600 * 1000);
        }
        // 获取从 1 天前开始消费的每个分区的 offset
        Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);
        // 遍历每个分区，对每个分区设置消费时间。
        for (TopicPartition topicPartition : assignment) {
            OffsetAndTimestamp offsetAndTimestamp = offsets.get(topicPartition);
            // 根据时间指定开始消费的位置
            if (offsetAndTimestamp != null) {
                // offsetAndTimestamp.offset() 根据时间获取对应的offset
                kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
            }
        }

    }

    /**
     * 指定 Offset 消费
     *
     * @param kafkaConsumer
     */
    private static void assignOffset(KafkaConsumer<String, String> kafkaConsumer) {
        // 获取消费者分区分配信息
        Set<TopicPartition> assignment = new HashSet<>();
        while (assignment.size() == 0) {
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 获取消费者分区分配信息（有了分区分配信息才能开始消费）
            assignment = kafkaConsumer.assignment();
        }
        // 遍历所有分区，并指定 offset 从 1700 的位置开始消费
        for (TopicPartition tp : assignment) {
            kafkaConsumer.seek(tp, 1700);
        }
    }
}
